Netty的UDP广播发送与接收

您所在的位置:网站首页 udp广播包 如何发送和接收 Netty的UDP广播发送与接收

Netty的UDP广播发送与接收

2024-07-10 08:19| 来源: 网络整理| 查看: 265

想要的逻辑是这样的: A向局域网内发送广播消息messageA; B收到了messageA并直接使用既有的session或channel把需要回复的消息write回来就行了。

自己尝试了一下,记载一下使用中较为便利的写法。

客户端一般是这样写:

EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap() .group(eventLoopGroup) .channel(NioDatagramChannel.class) .option(ChannelOption.SO_BROADCAST, true) .handler(new NormalUDPClientHandler()); bootstrap.bind(9999).sync().channel().closeFuture().await(); } catch (InterruptedException e) { e.printStackTrace(); } finally { eventLoopGroup.shutdownGracefully(); }

就像创建一个普通的客户端一样,不过channel传入的通道类型为NioDatagramChannel,使用的是bind而非connect,以及option传入的网络选项为ChannelOption.SO_BROADCAST。

上面这几行代码在《Netty权威指南》第十二章有类似出现,但原文是bind(0),这里是bind(9999),毕竟真实场景中还是很少拿0作为接收端口。或者说,原文想表达的意思就是这里的bootstrap只作为发送方存在,不接收回复。

初次看这段代码时被这个bind(0)误导了,以为是发送UDP必须如此…后面发现并不是,bind(0)代表着发送方会随机选择一个端口去发送UDP,类似于”万能端口“的概念,如果绑定一个合理值,那么发送端口就会被固定下来。 另外,区分客户端与服务端的界限并不明显,尤其是在单对单的情况下,在单对多的情况下,可以由哪一方实现了childrenHandler接口为服务端。

但其实发送UDP的重点除了上述所说的通道类型与网络选项外,还在于发送的数据包的特殊,与bind(0)关系不大。这个数据包也就是DatagramPacket,这里在handler中体现:

public class NormalUDPClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { L.d(ctx.channel().remoteAddress() + ""); ctx.executor().parent().execute(new Runnable() { @Override public void run() { for (int i = 0; i :/192.168.0.104:9999,我在广播0 20:01:45.045 (NormalUDPServerHandler.java:26)#channelRead-->:/192.168.0.104:9999,我在广播1 20:01:47.047 (NormalUDPServerHandler.java:26)#channelRead-->:/192.168.0.104:9999,我在广播2

发送端的输出为:

20:01:43.043 (NormalUDPClientHandler.java:19)#channelActive-->:null 20:01:43.043 (NormalUDPClientHandler.java:48)#channelRead-->:/192.168.0.104:10000,我在广播0 20:01:45.045 (NormalUDPClientHandler.java:48)#channelRead-->:/192.168.0.104:10000,我在广播1 20:01:47.047 (NormalUDPClientHandler.java:48)#channelRead-->:/192.168.0.104:10000,我在广播2 疑问

问题:

客户端启动后是怎么进入channelActive来发送广播的?channelActive什么时候才会触发吗?客户端是怎么实现自己关闭的?调用ctx.close后会怎样?

其实都可以通过了解源码得知,前面的逻辑我们不再赘述,有兴趣的话可以通过另一篇从Netty源码看心跳超时机制大致了解一番。

启动

我们先来看这个UDP的客户端是怎么”启动“的。 首先注意到上文记载的UDP广播无论是发送方还是接收方其实都是用的bind方式,以传统理解其实都是作为服务端存在的,但在netty中这样的概念其实是不明显的,只考虑"端对端"的情况是可以不区分谁是客户端谁是服务端的; 当然这也是因为UDP和TCP不同,是无连接的,所以才可以都使用bind使自己固定地址和端口;

大致调用流程都是一致的,无论是客户端还是服务端都会先创建channel并注册到某线程,然后处理连接事项后补全channel的状态与信息:

AbstractBootstrap--> AbstractChannel--> AbstractChannelHandlerContext--> DefaultChannelPipeline--> AbstractChannel$AbstractUnsafe

和看心跳那篇博客的时序图基本一致,还是先关注AbstractUnsafe中的register0方法:

private void register0(ChannelPromise promise) { try { // ... pipeline.fireChannelRegistered();//1必定调用 System.out.println("register0->isActive="+isActive()); System.out.println("register0->firstRegistration="+firstRegistration); if (isActive()) { if (firstRegistration) { pipeline.fireChannelActive();//2看条件 } else if (config().isAutoRead()) { beginRead(); } } } catch (Throwable t) { // ... } }

其中有fireChannelRegistered和fireChannelActive,前者是必定调用的,而后者需要看isActive的状态,那么TCP建立和UDP建立会有所不同么? 这里当然是相同的,因为尚在注册阶段,一般情况下isActive均为false,

register0->isActive=false register0->firstRegistration=true

所以fireChannelActive在这两种情况均不会调用。 那么再看AbstractUnsafe的bind方法,建立服务使用bind最终都会调用到这里来:

public final void bind(final SocketAddress localAddress, final ChannelPromise promise) { // ... boolean wasActive = isActive(); System.out.println("bind->isActive="+isActive()); System.out.println("bind->wasActive="+wasActive); try { doBind(localAddress);//与本地地址绑定 } catch (Throwable t) { safeSetFailure(promise, t); closeIfClosed(); return; } System.out.println("bind->after doBind"); System.out.println("bind->isActive="+isActive()); System.out.println("bind->wasActive="+wasActive); if (!wasActive && isActive()) { invokeLater(new Runnable() { @Override public void run() { pipeline.fireChannelActive();//1取决于两个判断标记 } }); } safeSetSuccess(promise); }

可以看到,doBind方法才是真正的绑定服务,而其后会通过wasActive标记与isActive状态的值来决定是否调用fireChannelActive。 那么这一步TCP服务和UDP服务两者会有所不同么?

TCP服务bind bind->isActive=false bind->wasActive=false bind->after doBind bind->isActive=true bind->wasActive=false --------------------- UDP服务bind bind->isActive=false bind->wasActive=false bind->after doBind bind->isActive=true bind->wasActive=false

还是一样的,没想到吧。 所以就算是调用bind方法,仍然会回调到handler的channelActive方法,这是匆庸置疑的,但我们一般情况下建立TCP服务端的时候并不会为其“自身"添加handler,而是监听连接使用的childrenHandler,所以往往会忽略:

ServerBootstrap serverBootstrap = new ServerBootstrap() .group(boos, worker) .channel(NioServerSocketChannel.class) //.handler(new NormalServerHandler())//看不见我 .childOption(ChannelOption.TCP_NODELAY,true) .childHandler(new ChannelInitializer() { protected void initChannel(Channel ch) { ch.pipeline().addLast(new NormalServerHandler()); } });

如果将上面被注释的这一行放开,那么就算只是服务端自己启动,也仍然会调用其中的channelActive方法,表示自身已经处于激活状态了。 所以至此应该可以回答第一个问题: UDP的客户端和服务端其实本质上都是传统意义上的服务端写法,均使用bind,而bind成功后是必定要调用handler()所添加的Handler的channelActive方法的,和是否有channel连接是无关的; childrenHandler()添加的Handler才会在有channel连接时调用其对应的channelActive方法; 或者更进一步的理解: 无论是服务端还是客户端,无论是bind或是connect,在netty中都会先创建一个channel,再使用channel的相关方法去操作,而只要这个channel被创建成功,那么肯定会调用相应channelActive方法的。

//connect private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) { ChannelFuture regFuture = this.initAndRegister(); final Channel channel = regFuture.channel(); } //bind private ChannelFuture doBind(final SocketAddress localAddress) { final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); }

如果有不能进入channelRead的情况应该还是哪里写法有误,UDP这里的写法其实并不分客户端与服务端,全凭

new DatagramPacket(Unpooled.copiedBuffer("我在广播" + i, Charset.forName("utf-8")), new InetSocketAddress("255.255.255.255", 10001))

这样的包中的地址与端口来判断是否能发送,而channelRead和是否手动调用ctx.fireChannelActive()是无关的,服务会在自身绑定建立后就一直在自身的NioEventLoop线程的循环监听消息,所以应该不会影响。

关闭

如果这个”关闭“是指上文中UDP服务的关闭的话,其实客户端和服务端都是以服务形式启动的,所以关闭这个channel(通道)和关闭服务是一个意思。当然,这里建议把“channel”扩展理解为”端”,而非单纯的通道。 如果是指其他场景下,比如不挂起的情况下自动关闭:

bootstrap.bind(10000).sync().channel(); bootstrap.bind(10000).sync().channel().closeFuture().await();

前者是自动关闭的,后者是手动挂起,但其实自动关闭和手动关闭走的流程是一样的,只是自动关闭是由系统调用了手动关闭。 我们先看手动关闭。 如果在服务激活后5秒左右手动关闭是怎样的流程呢?

@Override public void channelActive(final ChannelHandlerContext ctx) throws Exception { System.out.println("handler active->"+ctx.channel().remoteAddress()+""); new Thread(new Runnable() { @Override public void run() { try { TimeUnit.SECONDS.sleep(5); System.out.println("----"); ctx.close(); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); }

ChannelHandlerContext可以看作对channel的一个包装,任意一端的建立都会产生一个新的channel,关闭了当前ChannelHandlerContext,也就意味着将当前端关闭了,所以服务也关闭了。 具体关闭源码:

private void close(final ChannelPromise promise, final Throwable cause, final ClosedChannelException closeCause, final boolean notify) { //... Executor closeExecutor = prepareToClose(); if (closeExecutor != null) { closeExecutor.execute(new Runnable() { @Override public void run() { try { // Execute the close. doClose0(promise); } finally { invokeLater(new Runnable() { @Override public void run() { if (outboundBuffer != null) { // Fail all the queued messages outboundBuffer.failFlushed(cause, notify); outboundBuffer.close(closeCause); } fireChannelInactiveAndDeregister(wasActive); } }); } } }); } else { //... } } private void doClose0(ChannelPromise promise) { try { doClose(); closeFuture.setClosed(); safeSetSuccess(promise); } catch (Throwable t) { closeFuture.setClosed(); safeSetFailure(promise, t); } }

还是会由AbstractChannel$AbstractUnsafe接管,调用其中的close方法,然后调用doClose0,最后调用这个doClose(); 这个doClose()会由新建时传入的channel种类确定,去调用各自类中的相应方法,

.channel(NioDatagramChannel.class)

UDP建立时使用的NioDatagramChannel.class,于是会回调NioDatagramChannel中的对应方法:

@Override protected void doClose() throws Exception { javaChannel().close(); }

最终使用JDK中的关闭方法去关闭这个channel。 另外,如果我们不挂起的情况下,其实和手动关闭走过的方法是一样的,不同之处在于在不挂起的情况下,框架内有一套检测机制会去调用关闭,类似在NioEventLoop中:

@Override protected void run() { for (;;) { //... try { if (isShuttingDown()) { closeAll(); if (confirmShutdown()) { return; } } } catch (Throwable t) { handleLoopException(t); } } } private void closeAll() { //... for (AbstractNioChannel ch: channels) { ch.unsafe().close(ch.unsafe().voidPromise()); } }

当一个channel被建立后,注册到线程池,其实线程就会一直跑着,一直去判断当前是否需要关闭; 一旦检测到需要关闭,就会调用

ch.unsafe().close

方法去关闭channel,这和ctx.close最终调用到的方法是一样的,都是AbstractUnsafe接管; 而isShuttingDown()中的状态会由线程池启动后就开始一直更新自己的状态,只要在这个线程池内还有任务存在,那么就能“苟活”,否则就会马上被判定为ST_SHUTTING_DOWN ,就开始步入“死亡”了。 可以在SingleThreadEventExecutor中找到相关的源码:

private void doStartThread() { assert thread == null; executor.execute(new Runnable() { @Override public void run() { //... try { SingleThreadEventExecutor.this.run(); success = true; } catch (Throwable t) { logger.warn("Unexpected exception from an event executor: ", t); } finally { for (;;) { int oldState = state; if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet( SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) { break; } } try { // Run all remaining tasks and shutdown hooks. for (;;) { if (confirmShutdown()) { break; } } } finally { try { cleanup(); } finally { FastThreadLocal.removeAll(); STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED); threadLock.countDown(); if (logger.isWarnEnabled() && !taskQueue.isEmpty()) { logger.warn("An event executor terminated with " + "non-empty task queue (" + taskQueue.size() + ')'); } terminationFuture.setSuccess(null); } } } } }); }

所以如果在handler里面强行挂个sleep,也是可以延长这个channel的寿命,因为这样属于“任务尚未处理完毕“。 至此,应该能大致回答第2个问题。 客户端(服务端)如果不挂起await或其他耗时任务,框架会去自动检测并关闭channel; ctx.close其实就是关闭channel,channel不止于通道的意思,它就是端,服务端把自己的端关闭了,服务自然就关了。

以上。



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3